package com.atguigu.flink.datastreamapi.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

/**
 * Created by Smexy on 2023/11/13

 选择oldapi
 */
public class Demo5_CustomRedisSink
{
    //把每种传感器最大的vc写出到redis
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                    .addSink(new MyRedisSinkFunction());


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }

    /*
        实现
     */
    private static class MyRedisSinkFunction extends RichSinkFunction<WaterSensor>
    {

        //来一条数据写出一次  key:id  value: json
        @Override
        public void invoke(WaterSensor value, Context context) throws Exception {
           jedis.set(value.getId(), JSON.toJSONString(value));
        }

        private Jedis jedis;

        //创建连接
        @Override
        public void open(Configuration parameters) throws Exception {
            jedis = new Jedis("hadoop102", 6379);
        }

        //关闭连接
        @Override
        public void close() throws Exception {
            jedis.close();
        }
    }
}
